Multiprocess Learning (Ape-X)

From version 9.4.2, new classes MPReplayBuffer and MPPrioritizedReplayBuffer support multiprocess learning like Ape-X (single learner with multiple explorers) on single machine efficiently.

1 Shared Memory

First of all, MPReplayBuffer and MPPrioritizedReplayBuffer maps internal data on shared memory. This means you don’t need to use proxy (e.g. multiprocessing.managers.SyncManager) or queue (e.g. multiprocessing.Queue) for interproecss data sharing, but you can simply access the buffer object from different process.

from multiprocessing import Process
from cpprb import MPPrioritizedReplayBuffer

rb = MPPrioritizedReplayBuffer(100,{"obs": {},"done" {}})

def explorer(rb):
    for _ in range(100):
        # Something ...
        rb.add(obs=obs, done=done)

p = Process(target=explorer,args=[rb]) # You can pass to Process simply as argument

sample = p.sample(10) # You can access data stored at different process.

2 Efficient Lock

Although you can implement Ape-X with ordinary ReplayBuffer or PrioritizedReplayBuffer class, locking entire buffer when writing and reading is quite inefficient.

# Part of Explorer Naiive Implementation
if local_buffer.get_stored_size() > local_size:
    local_sample = local_buffer.get_all_transitions()

    with lock: # Inefficient: Lock entire buffer during addition

MPReplayBuffer and MPPrioritizedReplayBuffer automatically lock only critical section instead of entire buffer. For example, since sequential add method calls should write different memory address, its critical section is only index fetching and increment. This locking reduction allows multiple explorers to add transitions parallelly.1

We adopt exclusive-read concurrent-write model for access control. We allow multiple writing parallelly and atomically trace the number of writers in the critical section. Reading has higher priority and prevents writers (aka. actors) from entering the critical section again. When all writers exit the critical section, reader (aka. learner) starts working in the critical section.

We restrict the number of learner to 1. If we allow multiple learners, which have higher priorities, it is possible that actors will never enter the critical section, which is not desired for reinforcement learning.

3 Limitation

MPReplayBuffer and MPPrioritizedReplayBuffer don’t support features of Nstep Experience Replay, Memory Compression, and Map Data on File. (You can still utilize these features at local buffers of explorers.)

MPReplayBuffer and MPPrioritizedReplayBuffer assume single learner (sample / update_priorities) and multiple explorers (add). You must not call learner functions from multiple processes simultaneously.

4 Example Code

from multiprocessing import Process, Event, SimpleQueue
import time

import gym
import numpy as np
from tqdm import tqdm

from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer

class MyModel:
    def __init__(self):
        self._weights = 0

    def get_action(self,obs):
        # Implement action selection
        return 0

    def abs_TD_error(self,sample):
        # Implement absolute TD error
        return np.zeros(sample["obs"].shape[0])

    def weights(self):
        return self._weights

    def weights(self,w):
        self._weights = w

    def train(self,sample):
        # Implement model update

def explorer(global_rb,env_dict,is_training_done,queue):
    local_buffer_size = int(1e+2)
    local_rb = ReplayBuffer(local_buffer_size,env_dict)

    model = MyModel()
    env = gym.make("CartPole-v1")

    obs = env.reset()
    while not is_training_done.is_set():
        if not queue.empty():
            w = queue.get()
            model.weights = w

        action = model.get_action(obs)
        next_obs, reward, done, _ = env.step(action)

        if done:
            obs = env.reset()
            obs = next_obs

        if local_rb.get_stored_size() == local_buffer_size:
            local_sample = local_rb.get_all_transitions()

            absTD = model.abs_TD_error(local_sample)

def learner(global_rb,queues):
    batch_size = 64
    n_warmup = 100
    n_training_step = int(1e+4)
    explorer_update_freq = 100

    model = MyModel()

    while global_rb.get_stored_size() < n_warmup:

    for step in tqdm(range(n_training_step)):
        sample = global_rb.sample(batch_size)

        absTD = model.abs_TD_error(sample)

        if step % explorer_update_freq == 0:
            w = model.weights
            for q in queues:

if __name__ == "__main__":
    buffer_size = int(1e+6)
    env_dict = {"obs": {"shape": 4},
                "act": {},
                "rew": {},
                "next_obs": {"shape": 4},
                "done": {}}
    n_explorer = 4

    global_rb = MPPrioritizedReplayBuffer(buffer_size,env_dict)

    is_training_done = Event()

    qs = [SimpleQueue() for _ in range(n_explorer)]
    ps = [Process(target=explorer,
          for q in qs]

    for p in ps:


    for p in ps:


  1. Updating segment tree for PER is critical section, too. To avoid data race, MPPrioritizedReplayBuffer lazily updates segment tree from learner process just before sample method. ↩︎